package com.newtec.dataprocess.executor.jobhandler;

import cn.hutool.core.util.StrUtil;
import com.newtec.dataprocess.core.context.XxlJobContext;
import com.newtec.dataprocess.core.context.XxlJobHelper;
import com.newtec.dataprocess.core.handler.IJobHandler;
import com.newtec.dataprocess.core.handler.annotation.XxlJob;
import com.newtec.dataprocess.executor.model.KettleRepositoryParam;
import com.newtec.dataprocess.executor.model.MyJobListener;
import com.newtec.dataprocess.executor.model.MyTransListener;
import com.newtec.dataprocess.kettle.common.utils.KettleParamUtils;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author: kettle 资源库模式执行器
 * @date: 2020-09-25 10:40
 * @version: 1.0
 * @description: 功能描述
 */
@Component
public class KettleRepositoryHandler extends IJobHandler {
    /**
     * execute handler, invoked when executor receives a scheduling request
     *
     * @return
     * @throws Exception
     */
    @XxlJob(value = "repositoryHandler", init = "init", destroy = "destroy")
    @Override
    public void execute() throws Exception {
        String param = XxlJobHelper.getJobParam();
        XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
        xxlJobContext.setExtJson(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        xxlJobContext.setHandlerEnum("test");
        if (StrUtil.isEmpty(param)) {
            XxlJobHelper.log("请参照执行器管理的参数格式");
            XxlJobHelper.handleFail("请参照执行器管理的参数格式");
            return;
        }
        KettleRepositoryParam repositoryParam = KettleRepositoryParam.toKettleRepositoryParam(param);
        if (repositoryParam == null) {
            XxlJobHelper.log("请参照执行器管理的参数格式");
            XxlJobHelper.handleFail("请参照执行器管理的参数格式");
            return;
        }
        try {
            KettleDatabaseRepository repository = repositoryConn(repositoryParam);
            if (repository == null) {
                XxlJobHelper.log("连接资源库出错，请检查资源库配置！！");
                XxlJobHelper.handleFail("连接资源库出错，请检查资源库配置");
                return;
            }
            // 根据指定的字符串路径 找到目录，在 r_transformation中id_directory=0的代表根目录
            RepositoryDirectoryInterface dir = null;
            ObjectId id = null;
            dir = repository.findDirectory(repositoryParam.getRootDirectory());
            id = repository.getTransformationID(repositoryParam.getJobOrTransName(), dir);
            String message = "执行成功！！";
            if (id != null) {
                TransMeta transMeta = repository.loadTransformation(id, null);
                Trans trans = new Trans(transMeta);
                trans.addTransListener(new MyTransListener());
                //trans.setVariable("size", "5");
                KettleParamUtils.setJobOrTransParams(trans, null, null, repositoryParam.getParams());
                trans.execute(null);// 执行转换
                trans.waitUntilFinished(); // 等待转换执行结束
                if (trans.getErrors() > 0) {
                    message = "trans:" + repositoryParam.getJobOrTransName() + " kettle引擎执行报错，请检查转换是否有误！！";
                    XxlJobHelper.log(message);
                    XxlJobHelper.handleFail(message);
                    return;
                }
            } else {
                id = repository.getJobId(repositoryParam.getJobOrTransName(), dir);
                if (id != null) {
                    JobMeta jobMeta = repository.loadJob(id, null);
                    Job job = new Job(repository, jobMeta);
                    job.addJobListener(new MyJobListener());
                    // job.setVariable("size", "50");
                    KettleParamUtils.setJobOrTransParams(null, job, jobMeta, repositoryParam.getParams());
                    job.run();
                    // 等待转换执行结束
                    job.waitUntilFinished();
                    if (job.getErrors() > 0) {
                        message = "job:" + repositoryParam.getJobOrTransName() + " kettle引擎执行报错,请检查任务配置是否有误！！";
                        XxlJobHelper.log(message);
                        XxlJobHelper.handleFail(message);
                        return;
                    }
                } else {
                    message = "找不到--" + repositoryParam.getRootDirectory() + "/" + repositoryParam.getJobOrTransName() + "--转换或者任务";
                    XxlJobHelper.log(message);
                    XxlJobHelper.handleFail(message);
                    return;
                }
            }
            XxlJobHelper.handleSuccess(message);
            return;
        } catch (KettleException e) {
            e.printStackTrace();
            XxlJobHelper.log(e);
            XxlJobHelper.log("执行出错，请查看日志");
            XxlJobHelper.handleFail("执行出错，请查看日志");
            return;
        }
    }

    //线程初始化的时候调用
    @Override
    public void init() throws Exception {
        super.init();
        System.out.println("初始化...............");
    }

    //线程销毁的时候调用
    @Override
    public void destroy() throws Exception {
        super.destroy();
        this.clearRepositoryCacheMap();
        System.out.println("销毁线程...............");
    }

    public void clearRepositoryCacheMap() {
        if (repostoryCacheMap.size() > 0) {
            Iterator<Map.Entry<String, KettleDatabaseRepository>> it = repostoryCacheMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, KettleDatabaseRepository> entry = it.next();
                KettleDatabaseRepository repository = entry.getValue();
                if (repository.isConnected()) {
                    repository.disconnect();
                }
                it.remove();
            }
        }
    }

    //暂时repostory缓存，用于短时间多次执行情况线程还未结束的时候继续使用缓存
    public ConcurrentHashMap<String, KettleDatabaseRepository> repostoryCacheMap = new ConcurrentHashMap();

    /**
     * 配置资源库环境 并接连接的资源库
     *
     * @return
     * @throws KettleException
     */
    public KettleDatabaseRepository repositoryConn(KettleRepositoryParam param) throws KettleException {
        // 初始化
        //KettleEnvironment.init();
        //dbAccessTypeCode = new String[]{"Native", "ODBC", "OCI", "Plugin", "JNDI", ","};
        String key = param.getIp() + "-" + param.getDataBaseName() + "-" + param.getPort();
        if (this.repostoryCacheMap.get(key) != null) {
            return this.repostoryCacheMap.get(key);
        }
        // 数据库连接元对象
        // （kettle数据库连接名称(KETTLE工具右上角显示)，资源库类型，连接方式，IP，数据库名，端口，用户名，密码） //cgmRepositoryConn
        DatabaseMeta databaseMeta = new DatabaseMeta(key,
                "mysql",
                "Native",
                param.getIp(),
                param.getDataBaseName(),
                param.getPort(),
                param.getUserName(),
                param.getPassword());
        // 数据库形式的资源库元对象
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(databaseMeta);
        // 数据库形式的资源库对象
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        // 用资源库元对象初始化资源库对象
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
        // 连接到资源库
        kettleDatabaseRepository.connect("admin", "admin");// 默认的连接资源库的用户名和密码
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            XxlJobHelper.log("连接成功");
            repostoryCacheMap.put(key, kettleDatabaseRepository);
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            XxlJobHelper.log("连接失败");
            return null;
        }
    }
}
